package com.example.consumer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
 * @BelongsProject: RabbitMqLearn
 * @BelongsPackage: com.wang.config.consumer
 * @Author: wang fei
 * @CreateTime: 2023-02-04  17:20
 * @Description: TODO MQ 订阅者
 * @Version: 1.0
 */
@Component
public class MyConsumer {

    /**
     * @description: 监听队列，当队列中有消息的时候，该⽅法会被回调，⽤来消费消息 消费端的幂等性的实现
     * @method: receive
     * @author: wang fei
     * @date: 2023/2/4 17:22:35
     * @param: [message]
     * @return: void
     **/
    @RabbitListener(queues = "my_boot_potic_queue")
    public void receive(Message message, Channel channel) throws IOException {
        byte[] msg = message.getBody();
        //获得消息的业务id
        String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
        //设置分布式锁
        Boolean lock=false;
//         lock = redisTemplate.opsForValue().setIfAbsent(messageId, 1, 100000,TimeUnit.MILLISECONDS);
        //⼿动ack
        if (lock) {
            //做消费
            //⼿动ack
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } else {
            //不做消费
            System.out.println("已重复消费");
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        }
        System.out.println("收到消息：" + new String(msg));
    }
}
